{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Adding a Database Trigger to a Lattice\n",
    "\n",
    "This example illustrates how to use `covalent.trigger.DatabaseTrigger` to trigger the workflow dispatches automatically after the successful execution of table reads with the conditions for N number of times.\n",
    "\n",
    "## Prerequisites\n",
    "    \n",
    "1. Install the recommended SQL drivers that support SQLAlchemy.\n",
    "2. Make sure a user with the name `postgres` exists in your PostgreSQL database (you can run `createuser postgres` command for that).\n",
    "3. Create a database called `aqdb` (you can run `createdb aqdb` command for this).\n",
    "(both the commands mentioned above come installed as part of PostgreSQL installation as tested in macOS `postgres` installation using homebrew)\n",
    "5. Import the Covalent and the DatabaseTrigger."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import covalent as ct\n",
    "from covalent.triggers import DatabaseTrigger"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Procedure\n",
    "\n",
    "1. Create a new table `test_db_trigger`. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-01-07 17:34:25,040 INFO sqlalchemy.engine.Engine select pg_catalog.version()\n",
      "2024-01-07 17:34:25,041 INFO sqlalchemy.engine.Engine [raw sql] ()\n",
      "2024-01-07 17:34:25,042 INFO sqlalchemy.engine.Engine select current_schema()\n",
      "2024-01-07 17:34:25,042 INFO sqlalchemy.engine.Engine [raw sql] ()\n",
      "2024-01-07 17:34:25,043 INFO sqlalchemy.engine.Engine show standard_conforming_strings\n",
      "2024-01-07 17:34:25,043 INFO sqlalchemy.engine.Engine [raw sql] ()\n",
      "2024-01-07 17:34:25,044 INFO sqlalchemy.engine.Engine BEGIN (implicit)\n",
      "2024-01-07 17:34:25,044 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%s\n",
      "2024-01-07 17:34:25,045 INFO sqlalchemy.engine.Engine [generated in 0.00025s] ('test_db_trigger',)\n",
      "2024-01-07 17:34:25,046 INFO sqlalchemy.engine.Engine \n",
      "CREATE TABLE test_db_trigger (\n",
      "\ttrigger_at TIMESTAMP WITHOUT TIME ZONE NOT NULL, \n",
      "\tPRIMARY KEY (trigger_at)\n",
      ")\n",
      "\n",
      "\n",
      "2024-01-07 17:34:25,046 INFO sqlalchemy.engine.Engine [no key 0.00021s] ()\n",
      "2024-01-07 17:34:25,049 INFO sqlalchemy.engine.Engine COMMIT\n"
     ]
    }
   ],
   "source": [
    "\n",
    "db_path = \"postgresql+pg8000://postgres:sam@localhost:5432/aqdb\"\n",
    "table_name = 'test_db_trigger'\n",
    "\n",
    "#create table\n",
    "\n",
    "from sqlalchemy import Table, Column, MetaData, DateTime, create_engine\n",
    "meta = MetaData()\n",
    "engine = create_engine(db_path, echo=True)\n",
    "test_db_trigger = Table(\n",
    "table_name, meta,\n",
    "Column('trigger_at', DateTime, primary_key = True),\n",
    ")\n",
    "meta.create_all(engine)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "2. Load sample data into the newly created table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-01-07 17:34:28,071 INFO sqlalchemy.engine.Engine INSERT INTO test_db_trigger (trigger_at) VALUES (%s)\n",
      "2024-01-07 17:34:28,071 INFO sqlalchemy.engine.Engine [generated in 0.00094s] ((datetime.datetime(2024, 1, 7, 12, 34, 27, 33553),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 138484),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 243426),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 348190),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 453000),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 555778),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 655854),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 760891),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 862814),), (datetime.datetime(2024, 1, 7, 12, 34, 27, 965930),))\n",
      "2024-01-07 17:34:28,079 INFO sqlalchemy.engine.Engine COMMIT\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/var/folders/1z/64_91wwj46ng1xjffddgz_n40000gn/T/ipykernel_2625/3058673604.py:11: RemovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to \"sqlalchemy<2.0\". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.  Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)\n",
      "  result = conn.execute(insert(test_db_trigger),[*values])\n"
     ]
    }
   ],
   "source": [
    "# load sample data.\n",
    "from sqlalchemy import insert\n",
    "from datetime import datetime\n",
    "import time\n",
    "\n",
    "with engine.connect() as conn:\n",
    "    values = []\n",
    "    for _ in range(10):\n",
    "        values.append({\"trigger_at\": datetime.now()})\n",
    "        time.sleep(0.1)\n",
    "    result = conn.execute(insert(test_db_trigger),[*values])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "3. Create a `Database Trigger` object that performs a trigger. We can parse parameters such as `db_path`, `table_name`, `trigger_after_n`, and `poll_interval`. For this illustration, we will use the `PostgreSQL` database."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "database_trigger = DatabaseTrigger(db_path='postgresql+pg8000://postgres:sam@localhost:5432/aqdb',\n",
    "                                   table_name=table_name,\n",
    "                                    trigger_after_n=2,\n",
    "                                  poll_interval=3)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "4. Create a workflow:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "@ct.lattice\n",
    "@ct.electron\n",
    "def my_workflow():\n",
    "    return 42"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "5. Dispatch `my_workflow`, disabling its first execution using the `disable_run` parameter in `ct.dispatch`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "a7a74b5e-e865-430e-9a6c-258e8c51d0ed\n"
     ]
    }
   ],
   "source": [
    "dispatch_id = ct.dispatch(my_workflow)()\n",
    "print(dispatch_id)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "6. Attach the trigger to the `dispatch_id` and register it with the trigger server with the where clause to filter dispatches with lattice name `my_workflow`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "database_trigger.lattice_dispatch_id = dispatch_id\n",
    "triggered_at = values[-1][\"trigger_at\"]\n",
    "database_trigger.where_clauses = [f\"trigger_at = '{str(triggered_at)}'\"]\n",
    "database_trigger.register()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "7. Monitor the Covalent UI. Watch the Dashboard for new dispatches of `my_workflow`.\n",
    "\n",
    "8. In the Covalent UI, observe that a new `my_workflow` is dispatched after reading the table two times and with a polling interval of 3 seconds.\n",
    "\n",
    "9. To disable triggers on the dispatch, use the `ct.stop_triggers` function and drop the `test_db_trigger` table."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-01-07 17:35:51,633 INFO sqlalchemy.engine.Engine BEGIN (implicit)\n",
      "2024-01-07 17:35:51,633 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%s\n",
      "2024-01-07 17:35:51,634 INFO sqlalchemy.engine.Engine [cached since 86.59s ago] ('test_db_trigger',)\n",
      "2024-01-07 17:35:51,635 INFO sqlalchemy.engine.Engine \n",
      "DROP TABLE test_db_trigger\n",
      "2024-01-07 17:35:51,635 INFO sqlalchemy.engine.Engine [no key 0.00026s] ()\n",
      "2024-01-07 17:35:51,698 INFO sqlalchemy.engine.Engine COMMIT\n"
     ]
    }
   ],
   "source": [
    "ct.stop_triggers(dispatch_id)\n",
    "meta.drop_all(engine, tables=[test_db_trigger], checkfirst=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that the `stop_triggers` function disables all triggers attached to the specified dispatch. \n",
    "\n",
    "## See Also\n",
    "\n",
    "[Adding a Directory Trigger to a Lattice](./trigger_dir.ipynb)\n",
    "\n",
    "[Adding a TimeTrigger to a Lattice](./trigger_time.ipynb)\n",
    "\n",
    "[Adding a SQLite Trigger to a Lattice](./trigger_sqlite.ipynb)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "###### "
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.18"
  },
  "vscode": {
   "interpreter": {
    "hash": "ffe78875ce1aa6161f50f6a6dec2555e7255bbdb44cc39b93c0dfc1daa8da522"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
